# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from ducktape.mark import matrix

from rptest.clients.rpk import TopicSpec
from rptest.services.cluster import cluster
from rptest.services.redpanda import PandaproxyConfig, SISettings, SchemaRegistryConfig
from rptest.services.spark_service import QueryEngineType
from rptest.tests.datalake.datalake_e2e_test import DatalakeServices
from rptest.tests.datalake.datalake_verifier import DatalakeVerifier
from rptest.tests.datalake.utils import supported_storage_types
from rptest.tests.redpanda_test import RedpandaTest
from rptest.transactions.verifiers.compacted_verifier import CompactedVerifier, Workload


class DatalakeTransactionTests(RedpandaTest):
    def __init__(self, test_ctx, *args, **kwargs):
        super(DatalakeTransactionTests, self).__init__(
            test_ctx,
            num_brokers=1,
            si_settings=SISettings(test_context=test_ctx),
            extra_rp_conf={
                "iceberg_enabled": "true",
                "iceberg_catalog_commit_interval_ms": 5000,
                "log_compaction_interval_ms": 1000,
            },
            schema_registry_config=SchemaRegistryConfig(),
            pandaproxy_config=PandaproxyConfig(),
            *args,
            **kwargs,
        )
        self.test_ctx = test_ctx
        self.topic_name = "test"

    def setUp(self):
        pass

    def verify_no_aborted_batches(
        self,
        dl: DatalakeServices,
        query_engine: QueryEngineType = QueryEngineType.TRINO,
    ):
        qe_svc = dl.service(query_engine)
        assert qe_svc, f"No query engine of type {query_engine} found"
        table = f"redpanda.{qe_svc.escape_identifier(self.topic_name)}"
        # This pattern is taken from the format of the abort records
        # generated by the verifier. The verifier injects a known pattern
        # into all aborted records for future verification.
        abort_value_pattern = "'\ta\t'"
        query = f"select count(*)  from {table} where regexp_like(from_utf8(value), {abort_value_pattern}) = TRUE"
        abort_count = qe_svc.run_query_fetch_all(query)[0][0]
        if abort_count > 0:
            # Dump some diagnostics
            query = f"select redpanda.offset from {table} where regexp_like(from_utf8(value), {abort_value_pattern}) = TRUE limit 5"
            offsets = qe_svc.run_query_fetch_all(query)
            self.redpanda.logger.error(f"A few offsets with aborted records {offsets}")
        assert abort_count == 0, f"{abort_count} aborted records found in iceberg table"

    @cluster(num_nodes=4)
    @matrix(cloud_storage_type=supported_storage_types(), compaction=[False])
    def test_with_transactions(self, cloud_storage_type, compaction):
        """Test ensures correctness of translation when running with transactions.
        Ensures no aborted transactions make it to the iceberg table"""

        tx_verifier = CompactedVerifier(self.test_context, self.redpanda, Workload.TX)
        min_num_records = 1000
        with DatalakeServices(
            self.test_context,
            redpanda=self.redpanda,
            include_query_engines=[QueryEngineType.TRINO],
        ) as dl:
            topic_config = dict()
            topic_config[TopicSpec.PROPERTY_SEGMENT_SIZE] = 1 * 1024 * 1024
            if compaction:
                topic_config[TopicSpec.PROPERTY_CLEANUP_POLICY] = (
                    TopicSpec.CLEANUP_COMPACT
                )
            dl.create_iceberg_enabled_topic(self.topic_name, config=topic_config)

            tx_verifier.start()
            # Produces transactional data into the topic with 30% aborted batches
            tx_verifier.remote_start_producer(
                self.redpanda.brokers(), self.topic_name, partitions=1
            )
            tx_verifier.ensure_progress(delta=min_num_records, timeout_sec=180)
            tx_verifier.remote_stop_producer()
            tx_verifier.remote_wait_producer()
            tx_verifier.stop()

            # Verify the committed offsets
            committed_offset_verifier = DatalakeVerifier(
                self.redpanda, self.topic_name, dl.trino()
            )
            committed_offset_verifier.start()
            committed_offset_verifier.wait()
            self.verify_no_aborted_batches(dl)
